接下來,我們將展示完整的 Python 程式碼。通過這個範例,你會體驗到 Airflow 如何幫助你簡化工作流程,提高效率。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
# 定義 DAG 參數
default_args = {
'owner': 'abc',
'start_date': datetime(2023, 1, 1),
'retries': 0,
'email': ['abc@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(
'dag_template',
default_args=default_args,
description='dag_template',
schedule_interval=timedelta(days=1), # 每天執行一次
catchup=False,
)
# 爬蟲腳本
def crawl_and_store_to_csv():
print("這function是給你放python code的地方")
# PythonOperator
crawl_and_store_csv_task = PythonOperator(
task_id='crawl_and_store_csv',
python_callable=crawl_and_store_to_csv,
dag=dag,
)
# PostgresOperator
create_data_table_task = PostgresOperator(
task_id='create_data_table',
postgres_conn_id="postgres_conn",
sql='''
CREATE TABLE IF NOT EXISTS a_table (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
subtitle VARCHAR(255),
);
''',
dag=dag,
)
# EmailOperator
send_email_task = EmailOperator(
task_id='send_email',
to=['efg@gmail.com'],
subject='Airflow - 匯出報告',
html_content='<p>Your Airflow job has finished.</p><p>Date: {{ execution_date }}</p>',
files=['/opt/airflow/Downloads/report_20230619010550.pdf'],
dag=dag
)
# 設置 DAG 的任務依賴關係
crawl_and_store_csv_task >> create_data_table_task >> send_email_task
以前都要一步一步手動執行,現在有了Airflow,省下很多時間與重複性的工作。而且,如果今天想跟閨密逛街吃飯或是跟兄弟打球,不用跟朋友說:「抱歉我下午3點不行,可以改約其他時間嗎?」直接一句話:「走啊!我都行!」這就是Airflow很讚的地方,不用害怕哪個時段一定要坐在電腦前執行某些程式只能哭哭不能出去玩。Airflow帶你飛,讓你成為時間的主人而不是僕人!